-
Notifications
You must be signed in to change notification settings - Fork 513
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add java priority queue, set, deque, collection coders #5520
base: main
Are you sure you want to change the base?
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #5520 +/- ##
=======================================
Coverage 61.42% 61.43%
=======================================
Files 312 312
Lines 11104 11117 +13
Branches 757 753 -4
=======================================
+ Hits 6821 6830 +9
- Misses 4283 4287 +4 ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see we're also missing coders for java.util.Collection[T]
, java.util.Set[T]
and java.util.Deque[T]
which are available by default in beam
// neither arrays nor PriorityQueues are consistentWithEquals | ||
Coder.xmap(ScalaCoders.arrayCoder[T])( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be better to create a coder instance, explicitly overriding consistentWithEquals
instead of depending on a specific type for that reason
I recall we've been comparing underlying coder and value ordering here, but we've not done that for scala The drawback of such check is that we must give an ordering implementation that gives a stable equal after serialization (mostly by using object ordering). |
pq coderShould roundtrip() and | ||
beOfType[Transform[_, _]] and | ||
materializeToTransformOf[ArrayCoder[_]] and | ||
beFullyCompliantNotConsistentWithEquals() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't this be non deterministic too ?
// custom ordering must have stable equal after serialization | ||
implicit val pqOrd: Ordering[String] = FlippedStringOrdering |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not really happy about this
When using e.g. algebird
PriorityQueueMonoid
, scio needs to serialize java PriorityQueues. If Kryo is used, especially for small queues, there can be significant overhead.Instead, this PR provides a PriorityQueue coder backed by a scala
Ordering
(as indeed the monoid is).The coder must be created explicitly by the user so that they are asserting that the comparator of the original PriorityQueues and the reconstituted ones are the same. e.g.
@RustedBones Is there some trick I can use to get an error message when
Coder[java.util.PriorityQueue[T]]
fails to be derived (that's not just the generic implicitNotFound onCoder
)?